线性模型案例二 例3.5
上图即为例3.5在算盘中的示例
视频教程:https://www.yuque.com/docs/share/161ee5fd-1a24-49bb-bfed-fd317d7e5c37#
示例项目大致分为5个部分
- 第一部分:CSV上传,上传CSV数据文件,这里的数据即为西瓜数据集3.0α
- 第二部分:图中的标签编码部分,将数据中的字符串列进行编码,比如将'好瓜'列中的'好','坏'变为0,1
- 第三部分:图中的线性判别分析实现节点,手动用python实现了线性判别分析算法,代码如下:
- lda.py
import numpy as np
from sklearn.metrics import accuracy_score
def _class_means(X, y):
classes, y = np.unique(y, return_inverse=True)
cnt = np.bincount(y)
means = np.zeros(shape=(len(classes), X.shape[1]))
np.add.at(means, y, X)
means /= cnt[:, None]
return means
class LinearDiscriminantAnalysis(object):
def __init__(self):
self.classes_ = np.array([0, 1])
self.tol = 1e-4
def train(self, X, y):
n_samples, _ = X.shape
n_classes = len(self.classes_)
_, y_t = np.unique(y, return_inverse=True) # non-negative ints
self.priors_ = np.bincount(y_t) / float(len(y))
if not np.isclose(self.priors_.sum(), 1.0):
logger.warn("The priors do not sum to 1. Renormalizing", UserWarning)
self.priors_ = self.priors_ / self.priors_.sum()
# Maximum number of components no matter what n_components is
# specified:
max_components = min(len(self.classes_) - 1, X.shape[1])
self._max_components = max_components
self._solve_svd(X, y)
if self.classes_.size == 2: # treat binary case as a special case
self.coef_ = np.array(
self.coef_[1, :] - self.coef_[0, :], ndmin=2, dtype=X.dtype
)
self.intercept_ = np.array(
self.intercept_[1] - self.intercept_[0], ndmin=1, dtype=X.dtype
)
return self
def predict(self, X):
scores = self._decision_function(X)
if len(scores.shape) == 1:
indices = (scores > 0).astype(np.int)
else:
indices = scores.argmax(axis=1)
return self.classes_[indices]
def evaluate(self, X, y):
return accuracy_score(y, self.predict(X))
def _solve_svd(self, X, y):
n_samples, _ = X.shape
n_classes = len(self.classes_)
self.means_ = _class_means(X, y)
Xc = []
for idx, group in enumerate(self.classes_):
Xg = X[y == group, :]
Xc.append(Xg - self.means_[idx])
self.xbar_ = np.dot(self.priors_, self.means_)
Xc = np.concatenate(Xc, axis=0)
# 1) within (univariate) scaling by with classes std-dev
std = Xc.std(axis=0)
# avoid division by zero in normalization
std[std == 0] = 1.0
fac = 1.0 / (n_samples - n_classes)
# 2) Within variance scaling
X = np.sqrt(fac) * (Xc / std)
# SVD of centered (within)scaled data
U, S, V = np.linalg.svd(X, full_matrices=False)
rank = np.sum(S > self.tol)
# Scaling of within covariance is: V' 1/S
scalings = (V[:rank] / std).T / S[:rank]
# 3) Between variance scaling
# Scale weighted centers
X = np.dot(
(
(np.sqrt((n_samples * self.priors_) * fac))
* (self.means_ - self.xbar_).T
).T,
scalings,
)
# Centers are living in a space with n_classes-1 dim (maximum)
# Use SVD to find projection in the space spanned by the
# (n_classes) centers
_, S, V = np.linalg.svd(X, full_matrices=0)
self.explained_variance_ratio_ = (S ** 2 / np.sum(S ** 2))[
: self._max_components
]
rank = np.sum(S > self.tol * S[0])
self.scalings_ = np.dot(scalings, V.T[:, :rank])
coef = np.dot(self.means_ - self.xbar_, self.scalings_)
self.intercept_ = -0.5 * np.sum(coef ** 2, axis=1) + np.log(self.priors_)
self.coef_ = np.dot(coef, self.scalings_.T)
self.intercept_ -= np.dot(self.xbar_, self.coef_.T)
def _decision_function(self, X):
n_features = self.coef_.shape[1]
if X.shape[1] != n_features:
raise ValueError(
"X has %d features per sample; expecting %d" % (X.shape[1], n_features)
)
scores = X @ self.coef_.T + self.intercept_
return scores.ravel() if scores.shape[1] == 1 else scores
- main.py
from sklearn.model_selection import train_test_split
import suanpan
from suanpan.app import app
from suanpan.app.arguments import Csv, Json, ListOfString, String
from suanpan.log import logger
from lda import LinearDiscriminantAnalysis
@app.input(Csv(key="inputData1"))
@app.param(ListOfString(key="param1", alias="featureColumns"))
@app.param(String(key="param2", alias="labelColumn"))
@app.output(Json(key="outputData1"))
def LinearDiscriminantAnalysisImplmentation(context):
args = context.args
df = args.inputData1
X = df[args.featureColumns].values
y = df[args.labelColumn].values
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.33, random_state=42
)
lda = LinearDiscriminantAnalysis()
lda.train(X_train, y_train)
score = lda.evaluate(X_test, y_test)
logger.info("Predicted Results: {}".format(lda.predict(X_test)))
return {"accuracy": score}
if __name__ == "__main__":
suanpan.run(app)
- 单独运行该节点,然后点击
即可进入vscode查看,修改代码
- 该节点会输出分类的准确率
- 第四部分,图中的线性判别分析(现成组件)部分,直接使用了算盘中已有的线性判别分析分类组件
- 第五部分,图中的二次判别分析部分,直接使用了算盘中已有的二次判别分析分类组件